在 Day 14 我們初步認識了 TableProvider
作為數據源抽象的概念,Day 17 也看到了它在謂詞下推中的實際應用。今天我們將深入探討 TableProvider 的內部機制和優化策略,了解 DataFusion 如何實現高效的數據讀取。
所有查詢的終點都需要從實際的數據源讀取數據。TableProvider 機制正是 DataFusion 靈活性的關鍵——透過統一的介面整合各種數據源,並將優化能力下推到數據源層面。
今日學習重點:
我們在 Day 14 已經認識了 TableProvider trait 的基本結構,其核心是 scan()
方法。今天我們深入探討這個方法如何實現數據源的靈活讀取和優化。
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>, // 投影下推
filters: &[Expr], // 謂詞下推
limit: Option<usize>, // 限制下推
) -> Result<Arc<dyn ExecutionPlan>>;
這個方法的設計巧妙之處在於:透過三個可選參數(projection、filters、limit),讓數據源可以在讀取數據之前就進行優化,避免不必要的 I/O 和計算。
┌─────────────────────────────────────────────────────────────┐
│ Physical Planner │
│ │
│ 需要從 users 表讀取數據: │
│ SELECT id, name FROM users WHERE age > 18 LIMIT 100 │
└─────────────────────────────────────────────────────────────┘
│
▼
呼叫 scan() 方法
│
▼
┌─────────────────────────────────────────────────────────────┐
│ TableProvider │
│ │
│ 參數解析: │
│ • projection: Some([0, 1]) → 只需要第0和第1列 │
│ • filters: [age > 18] → 過濾條件 │
│ • limit: Some(100) → 最多返回100行 │
└─────────────────────────────────────────────────────────────┘
│
▼
創建對應的 ExecutionPlan
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 返回 ExecutionPlan │
│ │
│ 例如:ParquetExec, CsvExec, DataSourceExec 等 │
└─────────────────────────────────────────────────────────────┘
1. projection(投影下推)
projection
參數是一個列索引的陣列,表示查詢只需要這些列。這是「Projection Pushdown」優化的體現:
// 假設表有 5 列:[id, name, age, email, address]
// SELECT name, age FROM users
projection: Some(&vec![1, 2]) // 只需要第1列(name)和第2列(age)
對於列式存儲格式(如 Parquet),這個優化非常重要。如果表有 100 列但查詢只需要 2 列,我們可以只讀取這 2 列的數據,大幅減少 I/O。
2. filters(謂詞下推)
filters
參數包含過濾條件,這是「Filter Pushdown」(謂詞下推)優化:
// SELECT * FROM users WHERE age > 18 AND city = 'Taipei'
filters: &[
Expr::BinaryExpr(age > 18),
Expr::BinaryExpr(city = 'Taipei')
]
不同的數據源可以用不同的方式利用這些過濾條件:
3. limit(限制下推)
limit
參數告訴數據源最多需要多少行數據:
// SELECT * FROM users LIMIT 100
limit: Some(100)
這個參數特別適合:
在 Day 14 我們知道 ListingTable
用於讀取檔案系統中的數據檔案。現在讓我們深入它的內部結構,了解它如何實現高效的檔案掃描和優化。
pub struct ListingTable {
table_paths: Vec<ListingTableUrl>,
file_schema: SchemaRef, // 檔案中實際的欄位
table_schema: SchemaRef, // file_schema + 分區列
options: ListingOptions,
collected_statistics: FileStatisticsCache, // 統計資訊快取
constraints: Constraints,
column_defaults: HashMap<String, Expr>,
// ...
}
這個結構有幾個關鍵設計:
1. 雙重 Schema 設計
file_schema
:檔案中實際存儲的欄位table_schema
:加上從目錄路徑解析的分區列例如對於路徑 /data/year=2024/month=01/data.parquet
,即使檔案裡沒有 year
和 month
欄位,table_schema
也會包含這些分區列。
2. 分區表支援
ListingTable 支援 Hive 風格的分區表結構:
/data/
├── year=2023/
│ ├── month=01/
│ │ ├── data1.parquet
│ │ └── data2.parquet
│ └── month=02/
│ └── data3.parquet
└── year=2024/
└── month=01/
└── data4.parquet
當查詢包含分區列的條件時(如 WHERE year = 2024
),ListingTable 可以直接跳過不相關的分區目錄,這就是 Partition Pruning(分區剪枝),我們稍後會詳細探討。
3. 統計資訊快取機制
FileStatisticsCache
是 ListingTable 的關鍵優化:
// 快取內容示例
FileStatistics {
num_rows: Some(1_000_000), // 總行數
total_byte_size: Some(45_000_000), // 檔案大小
column_statistics: [
ColumnStatistics {
null_count: Some(0),
max_value: Some(ScalarValue::Int32(100)),
min_value: Some(ScalarValue::Int32(1)),
// ...
},
// ... 其他列的統計
],
}
這個快取的作用:
MemTable:將 Vec<RecordBatch>
直接存儲在記憶體中。它的 scan() 實作非常直接,主要特點是忽略 filters 參數(因為數據已在記憶體,過濾成本低),並且支援預先定義的 sort_order
來告訴執行引擎數據已排序。
StreamingTable:用於無界流式數據源,不支援大多數下推優化(因為流式數據無法預先統計),更多作為與外部流處理系統的適配層。
分區剪枝是一個非常重要的優化技術。讓我們透過一個具體的例子來理解。
假設我們有一個按日期分區的銷售數據表:
/sales/
├── year=2023/month=01/ → 包含 100 個檔案
├── year=2023/month=02/ → 包含 100 個檔案
├── ...
└── year=2024/month=12/ → 包含 100 個檔案
當執行查詢:
SELECT * FROM sales
WHERE year = 2024 AND month >= 10
LIMIT 100
不使用剪枝的情況下,需要:
使用剪枝後,只需要:
year = 2024 AND month >= 10
計算出符合的分區2024/10
、2024/11
、2024/12
ListingTable 的剪枝邏輯大致如下:
┌─────────────────────────────────────────────────────────────┐
│ 1. 解析查詢中的分區列過濾條件 │
│ WHERE year = 2024 AND month >= 10 │
│ → 提取出 partition_filters │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 2. 列舉所有分區路徑 │
│ 掃描目錄結構,找到所有分區 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 3. 對每個分區套用過濾條件 │
│ year=2023/month=01 → 不符合(year ≠ 2024) │
│ year=2024/month=09 → 不符合(month < 10) │
│ year=2024/month=10 → 符合 ✓ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ 4. 只對保留的分區列舉檔案 │
│ 減少檔案系統操作,提升性能 │
└─────────────────────────────────────────────────────────────┘
在 DataFusion 的實作中,這個邏輯主要在 pruned_partition_list()
函數中:
pub async fn pruned_partition_list<'a>(
ctx: &'a dyn Session,
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &'a [Expr],
file_extension: &'a str,
partition_cols: &'a [(String, DataType)],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
// 如果沒有分區列,直接列出所有檔案
if partition_cols.is_empty() {
return Ok(/* 列出所有檔案 */);
}
// 列舉所有分區
let partitions = list_partitions(
store,
table_path,
partition_cols.len(),
partition_prefix
).await?;
// 套用過濾條件進行剪枝
let pruned = prune_partitions(
table_path,
partitions,
filters,
partition_cols
).await?;
// 返回剪枝後的檔案列表
Ok(/* stream of files from pruned partitions */)
}
除了分區剪枝,TableProvider 還可以利用 filters
和 projection
參數進行更細緻的優化。
以 Parquet 格式為例,假設我們有一個包含 50 列的用戶行為表:
-- 表結構:user_id, session_id, timestamp, event_type, ... (共50列)
SELECT user_id, event_type FROM user_events
不使用 Projection Pushdown:
使用 Projection Pushdown:
projection: Some([0, 3])
Filter Pushdown 在不同層次都可以發揮作用:
1. 邏輯層面(Optimizer)
在查詢優化階段,PushDownFilter
規則會嘗試將 Filter 下推到 TableScan:
Before: After:
┌─────────────┐ ┌─────────────┐
│ Filter │ │ TableScan │
│ age > 18 │ │ filters = │
└─────────────┘ │ [age > 18] │
▲ └─────────────┘
│
┌─────────────┐
│ TableScan │
└─────────────┘
2. 物理層面(ExecutionPlan)
在物理執行時,DataSourceExec 可以利用 filters:
3. 檔案格式層面
以 Parquet 為例,filter 可以在多個層次生效:
Parquet 檔案結構:
┌────────────────────────────────────────┐
│ Footer (File Metadata) │ ← 檢查整個檔案的統計資訊
│ - 所有 column 的 min/max │
│ - Row group 數量 │
└────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────┐
│ Row Group 1 (Metadata) │ ← 檢查 row group 統計
│ - 每個 column 的 min/max │
│ - 行數:1,000,000 │
└────────────────────────────────────────┘
│
▼
┌────────────────────────────────────────┐
│ Column Chunk (age column) │
│ ┌──────────────────────────────────┐ │
│ │ Page 1: min=18, max=25 │ │ ← 可能包含符合條件的數據
│ │ Page 2: min=5, max=15 │ │ ← 全部 < 18,跳過!
│ │ Page 3: min=30, max=45 │ │ ← 可能包含符合條件的數據
│ └──────────────────────────────────┘ │
└────────────────────────────────────────┘
對於查詢 WHERE age > 18
:
這種多層次的過濾大幅減少了實際讀取和處理的數據量。
TableProvider 可以透過 supports_filters_pushdown
方法告訴優化器哪些過濾條件可以下推:
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>> {
filters.iter().map(|filter| {
// 檢查這個過濾條件是否可以下推
if can_pushdown(filter) {
TableProviderFilterPushDown::Exact // 精確過濾
} else {
TableProviderFilterPushDown::Unsupported // 不支援
}
}).collect()
}
返回值有三種可能:
今天我們深入探討了 TableProvider 的實作細節和優化機制:
實作機制:
多層次優化策略:
明天我們將探討 Parquet 讀取優化的細節,深入了解列式存儲格式如何在多個層次實現過濾和投影優化。